package spark.streaming.readkafka

import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date

import com.typesafe.config.ConfigFactory
import kafka.serializer._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SaveMode.Append
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}

object StreamProcessor {
  val stringDate = new SimpleDateFormat("yyyy-MM-dd").format(new Date(System.currentTimeMillis()))

  def main(args: Array[String]): Unit = {

    implicit val conf = ConfigFactory.load

    val spark = SparkSession.builder()
      .master(conf.getString("spark.master"))
      .appName("readKafkaPersistToHdfs")
      .config("spark.streaming.backpressure.enabled", "true")
      .config("spark.sql.parquet.compression.codec", "snappy")
      .config("spark.sql.parquet.mergeSchema", "true")
      .config("spark.sql.parquet.binaryAsString", "true")
      .config("spark.sql.shuffle.partitions", "100")
      .getOrCreate()

//    spark.sparkContext.setLogLevel("warn")
    val sinkTmpPath = ConfPicks.HDFS_URL + "/tmp/spark_tmp" + stringDate
    writeDataToHdfs(sinkTmpPath, spark)
//    readDataAndMergeFinalSinkAFileToHdfs(sinkTmpPath, spark)
//    readDataFromHdfsAndCount(spark)
//    writeDataToLocal(spark)
  }

  /**
    * 写数据到hdfs指定的目录中，会根据batch时间来生成相应的文件(会产生很多小文件)
    *
    * @param sinkTmpPath 写数据到hdfs的目录
    * @param spark
    */
  def writeDataToHdfs(sinkTmpPath: String, spark: SparkSession) = {
    import spark.implicits._
    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
    val kafkaParams = Map(
      "metadata.broker.list" -> ConfPicks.KAFKA_BROKER_LIST)
      .+ ("fetch.message.max.bytes"->"10485760")

    val topics = ConfPicks.KAFKA_TOPICS.split(",")
    val topicSet = Set[String]()
    for (i <- 0 until topics.length) {
      topicSet.+(topics(i))
    }
    val kafkaDStream = KafkaUtils
      .createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet).map(_._2)
    /**
      * 10分钟产生一个RDD
      */
    val winDstream = kafkaDStream.window(Minutes(10),Minutes(10))
    winDstream.map(value => {
      Row(value).toString()
    }).foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        val valueDS = spark.createDataset(rdd)
        valueDS.coalesce(10).write.mode(Append).parquet(sinkTmpPath)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }

  /**
    * 写普通文件到本地
    * @param spark
    */
  def writeDataToLocal(spark: SparkSession) = {
    import spark.implicits._
    val sinkTmpPath = "local_data"

    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    val inputDsteam = ssc.socketTextStream(ConfPicks.HOST,9998)

    val winDstream = inputDsteam.window(Minutes(1),Minutes(1))

    winDstream.foreachRDD(rdd =>{
      rdd.foreach(value => println("line is : "+value))
    })
//    val kafkaParams = Map(
//      "metadata.broker.list" -> ConfPicks.KAFKA_BROKER_LIST)
//
//    val topics = ConfPicks.KAFKA_TOPICS.split(",")
//    val topicSet = Set[String]()
//    for (i <- 0 until topics.length) {
//      topicSet.+(topics(i))
//    }
//    val kafkaDStream = KafkaUtils
//      .createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet).map(_._2)
    /**
      * 5分钟产生一个RDD
      */
//    val winDstream = kafkaDStream.window(Minutes(1),Seconds(30))
//    winDstream.map(value => {
//      Row(value).toString()
//    }).foreachRDD(rdd => {
//      if (!rdd.isEmpty()) {
//        val valueDS = spark.createDataset(rdd)
//        valueDS.select("value").show()
////        valueDS.coalesce(1).write.mode(Append).json(sinkTmpPath)
//      }
//    })
    ssc.start()
    ssc.awaitTermination()
  }
  /**
    * 从hdfs读取数据合并完后重新写入数据到一个hdfs文件中，并删除sinkTmpPath目录及数据
    *
    * @param sinkTmpPath
    * @param spark
    */
  def readDataAndMergeFinalSinkAFileToHdfs(sinkTmpPath: String, spark: SparkSession): Unit = {
    val finalSavePath = ConfPicks.HDFS_URL + "/tmp/kafkaData/spark" + stringDate
    val path = new Path(sinkTmpPath)
    val sqlContext = spark.sqlContext
//    val df = sqlContext.read.parquet(sinkTmpPath)
    val df = sqlContext.read.parquet(finalSavePath)

    val dataResFrame = df.select(df("value"))

    dataResFrame.coalesce(1).write.mode(Append).parquet(finalSavePath)

    /**
      * 在集群提交作业中可以使用以下方式获取hadoop conf
      */
    //    val hadoopConf = sc.hadoopConfiguration
    //    val hdfs = FileSystem.get(hadoopConf)
    try {
      val hdfs = FileSystem.get(new URI(ConfPicks.HDFS_URL), new Configuration())
      if (hdfs.exists(path)) {
        hdfs.delete(path, true)
      }
    } catch {
      case e: Exception => println("exception caught: " + e)
    }
  }
  def readDataFromHdfsAndCount(spark:SparkSession): Unit =
  {
    val sinkTmpPath = ConfPicks.HDFS_URL + "/tmp/spark_tmp" + stringDate
    val sqlContext = spark.sqlContext
    val df = sqlContext.read.parquet(sinkTmpPath)

    val dataResFrame = df.select(df("value"))
//    dataResFrame.createOrReplaceTempView("kafka_data")
//    spark.sql("select * from kafka_data")
    val msgCount = dataResFrame.count()

    println("total msg is :"+msgCount)

//    var count = 0L;
//    df.select(df("value")).rdd.foreach(row => {
//      for (i <- 0 until row.size) {
//        println(row.get(i))
//        count += 1
//      }
//    })
//    println("total msg is :"+count)
  }
}

case class DeviceData(msgType: String, did: String, gid: String, ip: String, thirdCloudId: String, registered: String)
